package com.dahuan.time;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

public class Window_EventTime_Watermarks {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       // env.setParallelism( 1 );
        //TODO 基于事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );
        //水印之间的间隔（以毫秒为单位）
        env.getConfig().setAutoWatermarkInterval( 100 );

        DataStream<String> inputStream = env.socketTextStream( "localhost", 7777 );
        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } )

                //TODO 延迟两秒触发
                .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>( Time.seconds( 2 ) ) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                } );


        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>( "late" ) {
        };

        SingleOutputStreamOperator<SensorReading> resultStream = dataStream.keyBy( "id" )
                .timeWindow( Time.seconds( 5 ) )
                /**
                 * 设置允许元素延迟的时间。
                 * 到达水印后超过指定时间的元素将被删除。
                 * 设置允许的延迟仅对事件时间窗口有效。
                 */
                .allowedLateness( Time.minutes( 1 ) )
                /**
                 * 将迟到的数据发送到给定{@link OutputTag}标识的侧面输出。
                 * 数据被认为是在水印通过窗口末尾之后加上使用{@link #allowedLateness（Time）}设置的允许延迟。
                 * 可以使用窗口操作产生的 {@link SingleOutputStreamOperator＃getSideOutput（OutputTag）}
                 * 和相同的{@link OutputTag}，使用{@link SingleOutputStreamOperator
                 * getSideOutput（OutputTag）}获取最新数据流
                 */
                .sideOutputLateData( outputTag )
                .minBy( "temperature" );

        //TODO 正常输出的数据流
        resultStream.print();

        /**
         * 当resultStream的前面类型是DataStream类型时，下方的resultStream
         * 无法调用getSideOutput方法，只能用多态包装
         *  ((SingleOutputStreamOperator<SensorReading>) resultStream).getSideOutput( outputTag ).print();
         *
         */
        //TODO 获取迟到数据
       resultStream.getSideOutput( outputTag ).print();
       env.execute("Window_EventTime_Watermarks");

    }
}
